{
 "cells": [
  {
   "cell_type": "raw",
   "metadata": {},
   "source": [
    "# Finding the cultural capital of Europe\n",
    "\n",
    "OK we are now ready for the big moment of finding which city is the cultural capital of Europe based on our criteria.\n",
    "\n",
    "## Perform Analysis on all the cultural features across all cities\n",
    "\n",
    "As we have seen in the previous chapter it's possible for us to perform a spaital join between two sets of geometries by a given spatial predicate. This required the following:\n",
    "\n",
    "1. each feature in each to have a unique id\n",
    "2. creating a dataframe with a tuple containing feature's unique id and it's geometry as a WKT string\n",
    "3. passing the Spark dataframe for to the `BroadcastSpatialJoin` function provided by SpatialSpark \n",
    "\n",
    "The result of the `BroadcastSpatialJoin` is a list of tuples with the ID's of features that meet our join criteria. So essentially a join table for the two datasets. \n",
    "\n",
    "We then have to take these tuples and attach our properties from each dataset namely for the cultural POIs labels and for the cities the city names and the population. We must then count how many of each type of cultural locations are there for each city. When we know how many museums, galleries, theaters and artworks there are in each city we can then calculated the weighted score for each location and then sum them for each city. So lets get started!"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Finding the cultural capital of Europe\n",
    "\n",
    "We are now ready for the big moment of finding which city is the cultural capital of Europe based on our criteria.\n",
    "\n",
    "## Perform Analysis on all the cultural features across all cities\n",
    "\n",
    "As we have seen in the previous chapter it's possible for us to perform a spaital join between two sets of geometries  for a given spatial predicate. This required the following three Steps:\n",
    "\n",
    "### Data preperation for doing Spatial Join via Spatial Spark\n",
    "\n",
    "* each feature in each set having a unique id\n",
    "* creating a dataframe with a tuple containing feature's unique id and it's geometry as a WKT string from each dataset\n",
    "* passing the Spark dataframe to the `BroadcastSpatialJoin` function provided by SpatialSpark \n",
    "\n",
    "The result of the `BroadcastSpatialJoin` is a list of tuples with the ID's of features that meet our join criteria. So essentially a join table for the two datasets. \n",
    "\n",
    "### Appending Properties back to the join results\n",
    "\n",
    "We then have to take these tuples and append our properties from each dataset namely for the cultural POIs labels and for the cities the city names and the population. \n",
    "\n",
    "### Doing analysis on join output\n",
    "\n",
    "We must then count how many of each type of cultural locations are there for each city. When we know how many museums, galleries, theaters and artworks there are in each city we can then calculated the weighted score for each location and then sum them for each city. \n",
    "\n",
    "So lets get started!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# %load './code/helpers/imports.py'\n",
    "import os.path, json, io, pandas\n",
    "import matplotlib.pyplot as plt\n",
    "%matplotlib inline\n",
    "matplotlib.rcParams['figure.figsize'] = (54, 60)\n",
    "\n",
    "# for exponential back down when calling TurboOverdrive API\n",
    "from retrying import retry \n",
    "\n",
    "# resuse as func.coalace for example\n",
    "import pyspark.sql.functions as func \n",
    "from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, DecimalType\n",
    "\n",
    "# for loading boundaries Data\n",
    "from geopandas import GeoDataFrame \n",
    "# for creating geospatial data\n",
    "from shapely.geometry import Point, Polygon, shape \n",
    "# for creating and parsing geospatial data in WKT and WKB format\n",
    "from shapely import wkb, wkt \n",
    "# for accessing OpenStreetMap API\n",
    "import overpy \n",
    "\n",
    "try:\n",
    "    sc\n",
    "except NameError:\n",
    "    import pyspark\n",
    "    sc = pyspark.SparkContext('local[*]')\n",
    "    sqlContext = pyspark.sql.SQLContext(sc)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# features = pandas.read_json(POIS_PATH)\n",
    "# features['properties']"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# %load './code/helpers/load_boundaries_and_pois.py'\n",
    "OVERPASS_API          = overpy.Overpass()\n",
    "BASE_DIR              = os.path.join(os.path.abspath('.'), 'work-flow')\n",
    "URBAN_BOUNDARIES_FILE = \"06_Europe_Cities_Boundaries_with_Labels_Population.geo.json\"\n",
    "\n",
    "# Paths to base datasets that we are using:\n",
    "URBAN_BOUNDARIES_PATH = os.path.join(BASE_DIR,URBAN_BOUNDARIES_FILE)\n",
    "POIS_PATH             = os.path.join(BASE_DIR, \"pois.json\")\n",
    "\n",
    "# Prepare data making sure that the WKT column exists\n",
    "try:\n",
    "    geo_df\n",
    "except NameError:\n",
    "    geo_df = GeoDataFrame.from_file(URBAN_BOUNDARIES_PATH)\n",
    "    # Add a WKT column for use later\n",
    "    geo_df['wkt'] = pandas.Series(\n",
    "        map(lambda geom: str(geom.to_wkt()), geo_df['geometry']),\n",
    "        index=geo_df.index, dtype='string')\n",
    "\n",
    "# Create a Boundries Spark Dataframe from GeoPandas \n",
    "try:\n",
    "    boundaries_from_pd\n",
    "except NameError:\n",
    "    boundaries_from_pd = sqlContext.createDataFrame(geo_df)\n",
    "    boundaries_from_pd.registerTempTable(\"boundaries\")\n",
    "\n",
    "# Create POI Spark Dataframe from GeoJSON Features data \n",
    "# where each feature is in it's own row\n",
    "try:\n",
    "    pois_df\n",
    "except NameError:\n",
    "    pois_df = sqlContext.read.json(POIS_PATH)\n",
    "    pois_df = pois_df.toPandas()\n",
    "    def toWktColumn(coords):\n",
    "        return (Point(coords).wkt)\n",
    "\n",
    "    pois_df['wkt'] = pandas.Series(\n",
    "        map(lambda geom: toWktColumn(geom.coordinates), pois_df['geometry']),\n",
    "        index=pois_df.index, dtype='string')\n",
    "\n",
    "    pois_df = sqlContext.createDataFrame(pois_df)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Unique IDs\n",
    "As we saw in the introduction to Spatial Spark chapter, each record in our dataset needs to have unique ID in order to allow us to get the tuple matching our Spatial predicates. If you missed that see the previous chapter titled: _Spatial Spark_. \n",
    "\n",
    "So we will begin by doing the following:\n",
    "\n",
    "1. create a dataframe from a tuple with a unique id and WKT Geometry for the Cultural Points of Interests (POIs)\n",
    "2. create a dataframe from a tuple with a unique id and WKT Geometry for the City Boundaries\n",
    "\n",
    "This will then be passed to Spatial Sparks API to obtain a Spatial Join. To create the unique IDs we will use the function `monotonicallyIncreasingId` provided by PySpark."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import monotonicallyIncreasingId\n",
    "\n",
    "# create dataframe with (id, geometry) for POIs\n",
    "# 1. Add and ID Column to POIs\n",
    "\n",
    "pois_df           = pois_df.withColumn(\"id\", monotonicallyIncreasingId())\n",
    "pois_tuple_id_wkt = pois_df.select(pois_df['id'], pois_df['wkt'])\n",
    "\n",
    "pois_tuple_id_wkt.show()\n",
    "pois_tuple_id_wkt.printSchema()\n",
    "print pois_tuple_id_wkt.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# create dataframe with (id, geometry-as-WKT) for boundaries\n",
    "# 1. Add and ID Column to boundaries\n",
    "\n",
    "boundaries_from_pd      = boundaries_from_pd.withColumn(\"id\", monotonicallyIncreasingId())\n",
    "boundaries_tuple_id_wkt = boundaries_from_pd.select(boundaries_from_pd['id'], boundaries_from_pd['wkt'])\n",
    "\n",
    "boundaries_tuple_id_wkt.printSchema()\n",
    "print boundaries_from_pd.count()\n",
    "boundaries_tuple_id_wkt.show()\n",
    "wkt.loads(boundaries_tuple_id_wkt.take(7)[6].wkt)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "spatialspark = sc._jvm.spatialspark\n",
    "SpatialOperator      = spatialspark.operator.SpatialOperator \n",
    "BroadcastSpatialJoin = spatialspark.join.BroadcastSpatialJoin\n",
    "from ast import literal_eval as make_tuple # used to decode data from java\n",
    "\n",
    "joinPoiBdryRDD = BroadcastSpatialJoin.apply(sc._jsc, \n",
    "                                            pois_tuple_id_wkt._jdf, \n",
    "                                            boundaries_tuple_id_wkt._jdf, \n",
    "                                            SpatialOperator.Within(), \n",
    "                                            0.0)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "print joinPoiBdryRDD.count()\n",
    "\n",
    "joinResults = map(lambda result: make_tuple(result.toString()), joinPoiBdryRDD.collect())\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "print pois_tuple_id_wkt.count()\n",
    "print boundaries_tuple_id_wkt.count()\n",
    "print joinResults[695]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# make the results a DF\n",
    "rddResult = sc.parallelize(joinResults)\n",
    "df = sqlContext.createDataFrame(rddResult, [\"poi_id\", \"boundry_id\"])\n",
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Do a join with poi df and bdry df\n",
    "df_with_pois = df.join(pois_df, df['poi_id'] == pois_df['id']).select(\n",
    "    df['poi_id'],\n",
    "    df['boundry_id'],\n",
    "    pois_df['properties'].alias(\"poi_properties\"),    \n",
    "    pois_df['wkt'].alias(\"poi_wkt\")\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "df_with_pois.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df_with_pois_bdrys = df_with_pois.join(\n",
    "        boundaries_from_pd, df_with_pois['boundry_id'] == boundaries_from_pd['id']\n",
    "    ).select(\n",
    "        df_with_pois['poi_id'],\n",
    "        df_with_pois['boundry_id'],\n",
    "        df_with_pois['poi_properties'],\n",
    "        df_with_pois['poi_wkt'],\n",
    "        boundaries_from_pd['wkt'].alias(\"boundry_wkt\"),\n",
    "        boundaries_from_pd['NAMEASCII'].alias(\"city_name\"),\n",
    "        boundaries_from_pd['POPEU2013'].alias(\"population\")\n",
    "    )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "df_with_pois_bdrys.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "df_with_pois_bdrys.cache()\n",
    "df_with_pois_bdrys.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "df_with_pois_bdrys.show(3)\n",
    "# we have now"
   ]
  },
  {
   "cell_type": "raw",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "## what have we done so far\n",
    "\n",
    "We have now created a join table with each POI and corresponding city along with the \n",
    "population and city name POI's have a nested property thanks to geojson with keys for \n",
    "the type of location. We will still need a way to decipher these.\n",
    "\n",
    "The other issue is that the tag for location type is split across amenity and tourism. \n",
    "To simplify our our calculation we will create a column for location type and have the label there."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "rec = df_with_pois_bdrys.take(1)[0]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "print rec.poi_properties.amenity or rec.poi_properties.tourism"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# For the first step we add columns for each tag. In this case we have amenity and tourism\n",
    "\n",
    "df_with_pois_bdrys = df_with_pois_bdrys.withColumn(\n",
    "        'tourism', df_with_pois_bdrys['poi_properties']['tourism']\n",
    "    ).withColumn(\n",
    "        'amenity', df_with_pois_bdrys['poi_properties']['amenity'])\n",
    "\n",
    "df_with_pois_bdrys.columns"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We next coalesce the two columns into a single column called `location_type` and then group our data by all the fields we need in this case: `city_name`, `population`, `location_type` and then perform a count."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df = df_with_pois_bdrys.select('*', func.coalesce(\n",
    "        df_with_pois_bdrys['poi_properties']['tourism'], \n",
    "        df_with_pois_bdrys['poi_properties']['amenity']\n",
    "    ).alias(\"location_type\")).groupby('boundry_id', \n",
    "                                      'city_name', \n",
    "                                      'location_type', \n",
    "                                      'population').count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "df.show(10)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This gives us the **count per location_type per city**. Next we need the weighted score for each location type and then perform a grouped sum of the **score** for each city. \n",
    "\n",
    "In order to perform the grouped sum we will create a UDF (user defined function) that given a `location_type`, `count` of the location and `population` for the city in question will return a single score value weighted by the location type and normalised by the population."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Create an UDF for a column that calculates the score per record\n",
    "\n",
    "def get_cultural_score(location_type, count, population):\n",
    "    cultural_weight_lookup = { \n",
    "        u'museum':      3.0,\n",
    "        u'arts_centre': 2.0,\n",
    "        u'theatre':     2.0,\n",
    "        u'gallery':     2.0,\n",
    "        u'artwork':     0.0  # try modifying the weights as an exercise\n",
    "    }\n",
    "\n",
    "    wgt = cultural_weight_lookup.get(location_type, 0.0)\n",
    "\n",
    "    return round(float((wgt* float(count) * 100000)/float(population)), 2)\n",
    "\n",
    "sqlContext.registerFunction(\"get_cultural_score\", get_cultural_score, FloatType())7\n",
    "\n",
    "# score_udf = func.udf(get_cultural_score, FloatType())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# score_df = df.select(df.boundry_id, \n",
    "#           df.city_name, \n",
    "#           df.population,\n",
    "#           df.location_type,\n",
    "#           df.count,\n",
    "#           score_udf(df.location_type, df.count, df.population).alias('cultural_score')\n",
    "#          )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "df.registerTempTable(\"cultural_score\")\n",
    "\n",
    "score_df = sqlContext.sql(\n",
    "    \"SELECT boundry_id, \\\n",
    "        city_name, \\\n",
    "        location_type, \\\n",
    "        population, \\\n",
    "        count, get_cultural_score(location_type, count, population) as score \\\n",
    "    FROM cultural_score\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "score_df.sort(score_df['city_name'].asc()).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "df = (score_df\n",
    "        .groupBy(\"boundry_id\", \"city_name\", \"population\")\n",
    "        .agg(func.round(func.sum(score_df['score']), 2).alias('final_score'))\n",
    "        .sort(\"city_name\"))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "df.show(4)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "df.collect()\n",
    "(df\n",
    " .select(df['city_name'], df['population'], df['final_score'])\n",
    " .sort(df['final_score'])\n",
    " .show(df.count()))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Visualizing the data\n",
    "\n",
    "Let map the data using the Folium package that allows embedding Leaflet Maps inside ipython notebooks.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "pd_df = df_with_pois_bdrys.toPandas()\n",
    "pd_df.head(2)\n",
    "df_with_pois_bdrys.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "pd_df = pd_df[['boundry_id', 'boundry_wkt', 'city_name', 'population']]\n",
    "pd_df = pd_df.drop_duplicates()\n",
    "pd_df.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Geometry list\n",
    "geometry = [wkt.loads(boundry_wkt) for boundry_wkt in pd_df.boundry_wkt]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# create a GeodataFrame\n",
    "geodf = GeoDataFrame(pd_df, geometry=geometry)\n",
    "geodf.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "scores = pd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "scores.head()\n",
    "scores_merged_df = pandas.DataFrame.merge(geodf, scores, on='boundry_id')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "scores_merged_df.head(1)\n",
    "geo_scores_merged = GeoDataFrame(scores_merged_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "geo_scores_merged.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "import folium\n",
    "map_osm = folium.Map(location=[47.19094, 11.98566], \n",
    "    tiles='Mapbox Bright',\n",
    "    zoom_start=6)\n",
    "\n",
    "map_osm.choropleth(geo_str=geo_scores_merged.to_json(),\n",
    "              data=geo_scores_merged,\n",
    "              columns=['city_name_x', 'final_score'],\n",
    "              fill_color='RdBu',\n",
    "              key_on='properties.city_name_x')\n",
    "map_osm"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.9"
  },
  "toc": {
   "toc_cell": false,
   "toc_number_sections": true,
   "toc_threshold": 6,
   "toc_window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
